<?php

namespace MnsClient;

use AliyunMNS\Client;
use AliyunMNS\Exception\MnsException;
use AliyunMNS\Model\SendMessageRequestItem;
use AliyunMNS\Requests\BatchSendMessageRequest;
use AliyunMNS\Requests\CreateQueueRequest;
use AliyunMNS\Requests\PublishMessageRequest;
use AliyunMNS\Requests\SendMessageRequest;
use AliyunMNS\Requests\BatchReceiveMessageRequest;
use Hyperfx\Framework\Logger\Logx;

class MnsClient {

    public function __construct(protected Client $client,protected array $config)
    {
    }

    /**
     * 发送命令任务
     *
     * @param string $commandName 命令名称
     * @param array $args 参数 格式：['--key', 'value']
     * @param int $delaySeconds
     */
    public function sendCommandTask($commandName, array $args = [], $delaySeconds = 0, string $queueName = null): bool {
        $queueName = $queueName ?: $this->config['command_queue_name'];
        foreach ($args as &$arg) {
            if (!is_string($arg)) {
                $arg = (string) $arg;
            }
        }
        $messageBody = [
            'command' => $commandName,
            'args' => $args
        ];
        $jsonMessageBody = json_encode($messageBody);
        if (false === $jsonMessageBody) {
            Logx::get()->alert('发送消息队列失败', [
                'queueName' => $queueName,
                'messageBody' => $messageBody,
                'delaySeconds' => $delaySeconds,
            ]);
            return false;
        }
        return $this->sendMessage($queueName, $jsonMessageBody, $delaySeconds);
    }

    /**
     * 批量发送命令任务
     *
     * @param string $commandName 命令名称
     * @param array $commands 参数 格式：[['command': 'demo:command', args: ['--key', 'value']], ['command': 'demo:command', args: ['--key', 'value']]]
     */
    public function batchSendCommandTask(array $commands, $delaySeconds = 0, int $priority = 0, string $queueName = null): bool {
        $queueName = $queueName ?: $this->config['command_queue_name'];

        $arrItems = [];
        foreach ($commands as $command) {
            foreach ($command['args'] as &$arg) {
                if (!is_string($arg)) {
                    $arg = (string) $arg;
                }
            }
            $jsonMessageBody = json_encode($command);
            if (false === $jsonMessageBody) {
                Logx::get()->alert('发送消息队列失败', [
                    'queueName' => $queueName,
                    'messageBody' => $command,
                    'delaySeconds' => $delaySeconds,
                ]);
                return false;
            }

            $arrItems[] = [
                'messageBody' => $jsonMessageBody,
                'delaySeconds' => $delaySeconds,
                'priority' => $priority,
            ];
        }

        return $this->batchSendMessage($queueName, $arrItems);
    }


    #创建队列
    public function createQueueRequest($queueName)
    {
        $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);

        $request = new CreateQueueRequest($queueName);
        try
        {
            $res = $this->client->createQueue($request);
            return true;
        }
        catch (MnsException $e)
        {
            // 4.可能因为网络错误，或者Queue已经存在等原因导致CreateQueue失败，这里CatchException并做对应的处理。
            return false;
        }
    }

    #发送消息
    /**
     * @param     $queueName   //队列名称
     * @param     $messageBody //消息主体
     * @param int $DelaySeconds //延迟消费时间
     * @param int $Priority //优先级
     *
     * @return bool
     */
    public function sendMessage($queueName,$messageBody,$DelaySeconds=0,$Priority=0)
    {
        $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
        // 1.首先获取Queue的实例。
        // PHP SDK默认会对发送的消息做Base64 Encode，对接收到的消息做Base64 Decode。
        // 如果不希望SDK做这样的Base64操作，可以在getQueueRef的时候，传入参数$base64=FALSE。即$queue = $this->client->getQueueRef($queueName, FALSE);
        $queue = $this->client->getQueueRef($queueName,false);

        //$messageBody = "test";
        // 2.生成一个SendMessageRequest对象。
        // SendMessageRequest对象本身也包含了DelaySeconds和Priority属性可以设置。
        // 对于Message的属性，请参见QueueMessage。
        //$bodyMD5 = md5(base64_encode($messageBody));
        $request = new SendMessageRequest($messageBody);
        if(!empty($DelaySeconds))
        {
            $request->setDelaySeconds($DelaySeconds);
        }
        if(!empty($Priority))
        {
            $request->setPriority($Priority);
        }
        try
        {
            $queue->sendMessage($request);
            // 3.消息发送成功。
            return true;
        }
        catch (MnsException $e)
        {
            Logx::get()->alert($queueName.'|'.$messageBody.'|'.$e->getMessage());
            // 4.可能因为网络错误，或MessageBody过大等原因造成发送消息失败，这里CatchException并做对应的处理。
            return false;
        }
    }

    public function sendTopic($topicName, $messageTag, $messageBody)
    {
        $topicName = sprintf('%s%s', $this->config['queue_name_prefix'], $topicName);
        // 1.首先获取Queue的实例。
        // PHP SDK默认会对发送的消息做Base64 Encode，对接收到的消息做Base64 Decode。
        // 如果不希望SDK做这样的Base64操作，可以在getQueueRef的时候，传入参数$base64=FALSE。即$queue = $this->client->getQueueRef($queueName, FALSE);
        $topic = $this->client->getTopicRef($topicName,false);

        //$messageBody = "test";
        // 2.生成一个SendMessageRequest对象。
        // SendMessageRequest对象本身也包含了DelaySeconds和Priority属性可以设置。
        // 对于Message的属性，请参见QueueMessage。
        //$bodyMD5 = md5(base64_encode($messageBody));
        $request = new PublishMessageRequest($messageBody);
        $request->setMessageTag($messageTag);
        $request->setMessageBody($messageBody);
        try
        {
            $topic->publishMessage($request);
            // 3.消息发送成功。
            return true;
        }
        catch (MnsException $e)
        {
            Logx::get()->alert($topicName.'|'.$messageBody.'|'.$e->getMessage());
            // 4.可能因为网络错误，或MessageBody过大等原因造成发送消息失败，这里CatchException并做对应的处理。
            return false;
        }
    }

    /**
     * @param     $queueName
     * @param     $receiptHandle
     * @param int $visibilityTimeout
     *
     * @return bool
     */
    public function changeMessageVisibility($queueName,$receiptHandle,int $visibilityTimeout):bool
    {
        try
        {
            $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
            $queue = $this->client->getQueueRef($queueName);
            // 调用ChangeMessageVisibility接口，修改被消费过并且还处于Inactive状态的消息与其下次可被消费的时间间隔。
            $res = $queue->changeMessageVisibility($receiptHandle,$visibilityTimeout);
            return true;
        }
        catch (MnsException $e)
        {
            Logx::get()->alert($queueName.'|'.$receiptHandle.'|'.$e->getMessage());
            return false;
        }
    }

    #接收消息
    public function receiveMessage($queueName,$waitseconds=null, bool $base64 = false, $isReturnOriginFormat = false)
    {
        try
        {
            $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
            $queue = $this->client->getQueueRef($queueName,$base64);
            // 1.直接调用receiveMessage函数。
            // receiveMessage函数接受waitSeconds参数，无特殊情况建议设置为30。
            // waitSeconds非0表示这次receiveMessage是一次http long polling，如果queue内没有message，那么这次request会在server端等到queue内有消息才返回。最长等待时间为waitSeconds的值，最大为30。
            $res = $queue->receiveMessage($waitseconds);
            if ($isReturnOriginFormat) {
                return $res;
            }
            return [
                'MessageId'=>$res->getMessageId(),//消息编号，在一个Queue中唯一。
                'MessageBody'=>$res->getMessageBody(),//消息正文
                'ReceiptHandle'=>$res->getReceiptHandle(),//本次获取消息产生的临时句柄，用于删除和修改处于Inactive消息，NextVisibleTime之前有效。
                'EnqueueTime'=>$res->getEnqueueTime(),//消息发送到队列的时间
                'FirstDequeueTime'=>$res->getFirstDequeueTime(),//消息第一次消费时间
                'NextVisibleTime'=>$res->getNextVisibleTime(),
                'DequeueCount'=>$res->getDequeueCount(),//总共被消费的次数。
                'Priority'=>$res->getPriority(),//消息的优先级权值
            ];
        }
        catch (MnsException $e)
        {
            if(!strstr($e->getMessage(),'timed out')&&!strstr($e->getMessage(),'Message not exist'))
            {
                Logx::get()->alert($queueName.'|'.$e->getMessage());
            }
            // 3.和CreateQueue和SendMessage一样，ReceiveMessage也有可能出错，所以加上CatchException并做对应的处理。
            return false;
        }
    }

    #接收消息
    public function deleteMessage($queueName,$receiptHandle)
    {
        try
        {
            $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
            $queue = $this->client->getQueueRef($queueName);
            // 1.直接调用receiveMessage函数。
            // receiveMessage函数接受waitSeconds参数，无特殊情况建议设置为30。
            // waitSeconds非0表示这次receiveMessage是一次http long polling，如果queue内没有message，那么这次request会在server端等到queue内有消息才返回。最长等待时间为waitSeconds的值，最大为30。
            $res = $queue->deleteMessage($receiptHandle);
            return true;
        }
        catch (MnsException $e)
        {
            Logx::get()->alert($queueName.'|'.$receiptHandle.'|'.$e->getMessage());
            // 3.和CreateQueue和SendMessage一样，ReceiveMessage也有可能出错，所以加上CatchException并做对应的处理。
            return false;
        }
    }

    #批量接收消息
    public function batchReceiveMessage($queueName,$numOfMessages,$waitseconds=null)
    {
        try
        {
            $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
            $queue = $this->client->getQueueRef($queueName,false);
            $request=new BatchReceiveMessageRequest($numOfMessages,$waitseconds);
            $res = $queue->batchReceiveMessage($request);
            $final=[];
            foreach($res->getMessages() as $message){
                $final[]= [
                    'MessageId'=>$message->getMessageId(),//消息编号，在一个Queue中唯一。
                    'MessageBody'=>$message->getMessageBody(),//消息正文
                    'ReceiptHandle'=>$message->getReceiptHandle(),//本次获取消息产生的临时句柄，用于删除和修改处于Inactive消息，NextVisibleTime之前有效。
                    'EnqueueTime'=>$message->getEnqueueTime(),//消息发送到队列的时间
                    'FirstDequeueTime'=>$message->getFirstDequeueTime(),//消息第一次消费时间
                    'NextVisibleTime'=>$message->getNextVisibleTime(),
                    'DequeueCount'=>$message->getDequeueCount(),//总共被消费的次数。
                    'Priority'=>$message->getPriority(),//消息的优先级权值
                ];
            }
            return $final;
        }
        catch (MnsException $e)
        {
            if(!strstr($e->getMessage(),'timed out')&&!strstr($e->getMessage(),'Message not exist'))
            {
                Logx::get()->alert($queueName.'|'.$e->getMessage());
            }
            // 3.和CreateQueue和SendMessage一样，ReceiveMessage也有可能出错，所以加上CatchException并做对应的处理。
            return false;
        }
    }

    #批量删除消息
    public function batchDeleteMessage($queueName,array $receiptHandles)
    {
        try
        {
            $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
            $queue = $this->client->getQueueRef($queueName);
            $res = $queue->batchDeleteMessage($receiptHandles);
            if(!$res->isSucceed()){
                throw new MnsException($res->getStatusCode(),'删除队列失败');
            }
            return true;
        }
        catch (MnsException $e)
        {
            Logx::get()->alert($queueName.'|'.json_encode($receiptHandles).'|'.$e->getMessage());
            // 3.和CreateQueue和SendMessage一样，ReceiveMessage也有可能出错，所以加上CatchException并做对应的处理。
            return false;
        }
    }

    /**
     * @param string $queueName //队列名称
     * @param array $messages //消息主体
     *
     * @return bool
     */
    public function batchSendMessage(string $queueName, array $messages): bool
    {
        $queueName = sprintf('%s%s', $this->config['queue_name_prefix'], $queueName);
        try {
            $queue = $this->client->getQueueRef($queueName, false);

            $items = [];
            foreach ($messages as $msg) {
                $item = new SendMessageRequestItem(
                    $msg['messageBody'],
                    $msg['delaySeconds'] ?? 0,
                    $msg['priority'] ?? 0,
                );
                $items[] = $item;
            }
            $request = new BatchSendMessageRequest($items);
            $queue->batchSendMessage($request);
            return true;
        } catch (MnsException $e) {
            Logx::get()->alert($queueName . '|' . $e->getMessage(), $messages);
            return false;
        }
    }
}
